package com.example.demo.stream.deal;

import com.example.demo.entity.EmployeePo;
import org.apache.flink.api.common.functions.AggregateFunction;

import java.util.HashMap;
import java.util.Map;

public class CountAggregate implements AggregateFunction<EmployeePo, Map<String, Long>, Map<String, Long>> {

        /**
         * 创建累加器
         *
         * @return 累加器对象
         */
        @Override
        public Map<String, Long> createAccumulator() {
            // 创建累加器
            return new HashMap<>();
        }

        /**
         * 每次流循环中 操作累加器
         *
         * @param value       当前流循环对象
         * @param accumulator 累加器对象
         * @return 累加器数据
         */
        @Override
        public Map<String, Long> add(EmployeePo value, Map<String, Long> accumulator) {
            String userName = value.getUserName();
            // 属于本窗口的数据来一条累加一次，并返回累加器
            if (accumulator.containsKey(userName)) {
                accumulator.put(userName, accumulator.get(userName) + 1);
            } else {
                accumulator.put(userName, 1L);
            }
            return accumulator;
        }

        /**
         * 获取最终操作结果
         *
         * @param accumulator 累加器对象
         * @return 最终结果
         */
        @Override
        public Map<String, Long> getResult(Map<String, Long> accumulator) {
            // 窗口闭合时，增量聚合结束，将计算结果发送到下游
            return accumulator;
        }

        @Override
        public Map<String, Long> merge(Map<String, Long> a, Map<String, Long> b) {
            return null;
        }
    }